package com.data.dev.elasticsearch;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.key.ConfigurationKey;
import com.data.dev.key.ElasticSearchKey;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 2022年6月17日15:15:06
 * @author wangxiaoming-ghq
 * Flink流计算结果写入ES公共方法
 */
@Slf4j
public class SinkToEs extends BaseBean {
    public static final long serialVersionUID = 2L;
    private static final HashMap<String,String> ES_PROPS_MAP = ConfigurationKey.getApplicationProps();
    private static final String HOST = ES_PROPS_MAP.get(ConfigurationKey.ES_HOST);
    private static final String PASSWORD = ES_PROPS_MAP.get(ConfigurationKey.ES_PASSWORD);
    private static final String USERNAME = ES_PROPS_MAP.get(ConfigurationKey.ES_USERNAME);
    private static final String PORT = ES_PROPS_MAP.get(ConfigurationKey.ES_PORT);

    /**
     * 2022年6月17日15:17:55
     * 获取ES连接信息
     * @return esInfoMap:ES连接信息持久化
     */
    public static HashMap<String,String > getElasticSearchInfo(){
        log.info("获取ES连接信息：【 " + "HOST="+HOST + "PORT="+PORT+"USERNAME="+USERNAME+"PASSWORD=********" + " 】");
        HashMap<String,String> esInfoMap = new HashMap<>();
        esInfoMap.put(ElasticSearchKey.HOST,HOST);
        esInfoMap.put(ElasticSearchKey.PASSWORD,PASSWORD);
        esInfoMap.put(ElasticSearchKey.USERNAME,USERNAME);
        esInfoMap.put(ElasticSearchKey.PORT,PORT);

        return esInfoMap;
    }

    /**
     * @param esIndexName:写入索引名称
     * @param esType：写入索引类型
     * @return ElasticsearchSink.Builder<String>：构建器
     */
    public static ElasticsearchSink.Builder<String> getEsSinkBuilder(String esIndexName,String esType){
        HashMap<String, String> esInfoMap = getElasticSearchInfo();
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost(String.valueOf(esInfoMap.get(ElasticSearchKey.HOST)), Integer.parseInt(esInfoMap.get(ElasticSearchKey.PORT)), "http"));

        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<String>() {

                    public IndexRequest createIndexRequest() {
                        Map<String, String> json = new HashMap<>();
                        //log.info("写入ES的data:【"+json+"】");
                        IndexRequest index  = Requests.indexRequest()
                                .index(esIndexName)
                                .type(esType)
                                .source(json);
                        return index;
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest());
                    }
                }
        );


        //定义es的连接配置  带用户名密码
        RestClientFactory restClientFactory = restClientBuilder -> {
            CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(
                    AuthScope.ANY,
                    new UsernamePasswordCredentials(
                            String.valueOf(esInfoMap.get(ElasticSearchKey.USERNAME)),
                            String.valueOf(esInfoMap.get(ElasticSearchKey.PASSWORD))
                    )
            );
            restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
                httpAsyncClientBuilder.disableAuthCaching();
                return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            });
        };

        esSinkBuilder.setRestClientFactory(restClientFactory);
        return esSinkBuilder;
    }

}
